package com.live.spaces.shanboli;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.StringWriter;
import java.net.InetAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Enumeration;

import javax.swing.JTable;
import javax.swing.JTextArea;
import javax.swing.table.DefaultTableModel;

import net.jxta.discovery.DiscoveryService;
import net.jxta.document.Advertisement;
import net.jxta.document.AdvertisementFactory;
import net.jxta.document.MimeMediaType;
import net.jxta.document.StructuredTextDocument;
import net.jxta.endpoint.Message;
import net.jxta.endpoint.MessageElement;
import net.jxta.endpoint.StringMessageElement;
import net.jxta.exception.PeerGroupException;
import net.jxta.id.IDFactory;
import net.jxta.peergroup.PeerGroup;
import net.jxta.peergroup.PeerGroupFactory;
import net.jxta.pipe.InputPipe;
import net.jxta.pipe.OutputPipe;
import net.jxta.pipe.PipeMsgEvent;
import net.jxta.pipe.PipeMsgListener;
import net.jxta.pipe.PipeService;
import net.jxta.platform.ModuleClassID;
import net.jxta.protocol.ModuleClassAdvertisement;
import net.jxta.protocol.ModuleSpecAdvertisement;
import net.jxta.protocol.PipeAdvertisement;
import net.jxta.socket.JxtaServerSocket;
import net.jxta.socket.JxtaSocket;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;

public class JxtaHelper implements PipeMsgListener {

	private final static int DEBUGLEVEL = 5;

	private PeerGroup netPeerGroup = null;

	private PipeAdvertisement dataOutPipeAdv;

	private JxtaServerSocket dataInSocket;

	private PipeAdvertisement msgOutPipeAdv;

	private DiscoveryService discovery;

	private PipeService pipes;

	private final static Logger LOG = Logger.getLogger(JxtaHelper.class
			.getName());

	private final static String msgPipeName = "MSG_PIPE";

	private final static String dataSocketName = "DATA_PIPE";

	public JTable tableOnShow;

	public DefaultTableModel model;

	public Object[] newPeer = new Object[2];

	private String nickName;

	private String ip = "unknow";

	private JTextArea textArea;

	public JxtaHelper() {

	}

	public JxtaHelper(JTable table, String name) {
		tableOnShow = table;
		model = (DefaultTableModel) tableOnShow.getModel();
		nickName = name;
	}

	public JxtaHelper(JTable table, String name, JTextArea mainTextArea) {
		tableOnShow = table;
		model = (DefaultTableModel) tableOnShow.getModel();
		nickName = name;
		textArea = mainTextArea;
	}

	/**
	 * prepare a bidirectional message pipe
	 */
	private void prepareMsgPipe() {
		/**
		 * advertise that we have a pipe to listen on control messages
		 */
		ModuleClassAdvertisement mcadv = (ModuleClassAdvertisement) AdvertisementFactory
				.newAdvertisement(ModuleClassAdvertisement
						.getAdvertisementType());
		mcadv.setName("JXTAMOD:" + msgPipeName);
		mcadv.setDescription("Fileshare Module Advertisement");
		ModuleClassID mcID = IDFactory.newModuleClassID();
		mcadv.setModuleClassID(mcID);

		/**
		 * remote publishing is of course needed so that the others know, we are
		 * there and offer a Pipe local publishing is needed, otherwise the
		 * local runtime is not informed when someone wants to reach us on the
		 * InputPipe
		 */
		try {
			discovery.publish(mcadv);
		} catch (Exception e) {
			e.printStackTrace();
		}
		discovery.remotePublish(mcadv);

		ModuleSpecAdvertisement msAdv = (ModuleSpecAdvertisement) AdvertisementFactory
				.newAdvertisement(ModuleSpecAdvertisement
						.getAdvertisementType());
		msAdv.setName("JXTASPEC:" + msgPipeName);
		msAdv.setVersion("Version 0.9");
		msAdv.setCreator("Shanbo Li");
		msAdv.setModuleSpecID(IDFactory.newModuleSpecID(mcID));
		msAdv.setSpecURI("http://www.knechtel.eu/filesharing/messagepipe");

		PipeAdvertisement pipeAdv = null;
		try {
			InputStream is = getClass().getResourceAsStream("./msgpipe.adv");
			pipeAdv = (PipeAdvertisement) AdvertisementFactory
					.newAdvertisement(MimeMediaType.XMLUTF8, is);
			is.close();
		} catch (IOException ioe) {
			System.err.println("Faild to read/parse pipe advertisement");
			return;
		}
		/**
		 * each time new pipe-id, otherwise the peer sends to itself if he holds
		 * the same pipe id listening
		 */
		pipeAdv.setPipeID(IDFactory.newPipeID(netPeerGroup.getPeerGroupID()));

		try {
			ip = InetAddress.getLocalHost().getHostAddress();
		} catch (UnknownHostException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		}

		pipeAdv.setDescription(nickName + "#" + ip);
		msAdv.setPipeAdvertisement(pipeAdv);

		if (6 < DEBUGLEVEL) {
			// display the advertisement as a plain text document.
			StructuredTextDocument doc = (StructuredTextDocument) msAdv
					.getDocument(MimeMediaType.XMLUTF8);
			try {
				StringWriter out = new StringWriter();
				doc.sendToWriter(out);
				System.out.println(out.toString());
				out.close();
			} catch (IOException ioe) {
			}
		}

		try {
			discovery.publish(msAdv);
		} catch (Exception e) {
			e.printStackTrace();
		}
		discovery.remotePublish(msAdv);

		try {
			InputPipe msgInPipe = pipes.createInputPipe(pipeAdv, this); // create
			// the
			// input
			// pipe
			// and
			// listen
			// on it
		} catch (IOException ioe) {
			System.err.println("Could not create input pipe and listen on it.");
		}

		/**
		 * discover listening pipes of our neighbours for control messages,
		 * choose one of them
		 */
		System.out
				.print("searching for a listening pipe of a peer for control messages");
		Enumeration en = null;
		boolean pipefound = false;
		while (!pipefound) {
			try {
				// search remotely and fill our cache
				discovery.getRemoteAdvertisements(null, DiscoveryService.ADV,
						"Name", "JXTASPEC:" + msgPipeName, 10, null);

				// let's look in our local cache to see
				// if we have it!
				en = discovery.getLocalAdvertisements(DiscoveryService.ADV,
						"Name", "JXTASPEC:" + msgPipeName);

				while (en != null && en.hasMoreElements()) {
					ModuleSpecAdvertisement mdsadv = (ModuleSpecAdvertisement) en
							.nextElement();
					try {
						// we can find the pipe to connect to the service
						// in the advertisement.
						msgOutPipeAdv = mdsadv.getPipeAdvertisement();

						// is the found pipe another than we ourselves
						// advertised?
						String ourInputPipeId = pipeAdv.getPipeID().toString();
						String discoveredInputPipeID = msgOutPipeAdv
								.getPipeID().toString();
						if (!ourInputPipeId.equals(discoveredInputPipeID)) {
							pipefound = true;
							newPeer[0] = msgOutPipeAdv.getDescription().split("#")[0];
							newPeer[1] = msgOutPipeAdv.getDescription().split("#")[1];
							model.addRow(newPeer);
							tableOnShow = new JTable(model);
							break; // we just take the first foreign pipe
							// later: let user choose to connect to which peer
						}
					} catch (Exception ex) {
						ex.printStackTrace();
						System.out
								.println("Client: Error discovering remote MSG pipe.");
					}
				}

				// The discovery is asynchronous as we do not know
				// how long is going to take
				try { // sleep as much as we want. Yes we
					// should implement asynchronous listener pipe...
					Thread.sleep(2000);
				} catch (Exception e) {
				}
			} catch (IOException e) {
				// found nothing! move on
			}
			System.out.print(".");
		}
		System.out.println();
	}

	/**
	 * when we get a message, print out the message on the console
	 * 
	 * @param event
	 *            message event
	 */
	public void pipeMsgEvent(PipeMsgEvent event) {
		Message msg = null;
		System.out.print("\nIncoming message on our MSG pipe: ");
		try {
			// grab the message from the event
			msg = event.getMessage();
			if (msg == null) {
				if (LOG.isEnabledFor(Level.DEBUG)) {
					LOG.debug("Received an empty message, returning");
				}
				return;
			}
			if (LOG.isEnabledFor(Level.DEBUG)) {
				LOG.debug("Received a response");
			}
			// get the message element named SenderMessage
			MessageElement msgElement = msg.getMessageElement(msgPipeName,
					msgPipeName);
			// Get message
			if (msgElement.toString() == null) {
				System.out.println("null msg received");
			} else {
				String msgStr = msgElement.toString();
				// Config conf = Config.getInstance();
				if (msgStr.startsWith("get "))
					sendFile(".\\share\\" + msgStr.substring(4));
				else if (msgStr.equals("ls")) {
					/**
					 * list our files in the sharedir
					 */
					String[] entries = new File(".\\share\\").list();
					sendMsg(Arrays.toString(entries));
				} else {
					System.out.println(msgElement.toString());
					textArea.append(msgElement.toString() + "\n");
				}

			}
		} catch (Exception e) {
			if (LOG.isEnabledFor(Level.DEBUG)) {
				LOG.debug(e);
			}
			return;
		}
	}

	public void sendMsg(String msgString) {
		try {
			Message msg = new Message();
			msg.addMessageElement(msgPipeName, new StringMessageElement(
					msgPipeName, msgString, null));
			System.out.println("Sending: " + msgString);

			OutputPipe outpipe = pipes.createOutputPipe(msgOutPipeAdv, 3000);
			outpipe.send(msg);
		} catch (Exception ie) {
			ie.printStackTrace();
		}
	}

	private void prepareDataSocket() {
		/**
		 * advertise that we have a data socket to listen on binary data
		 */
		ModuleClassAdvertisement mcadv = (ModuleClassAdvertisement) AdvertisementFactory
				.newAdvertisement(ModuleClassAdvertisement
						.getAdvertisementType());
		mcadv.setName("JXTAMOD:" + dataSocketName);
		mcadv
				.setDescription("Fileshare Module Advertisement for binary data socket.");
		ModuleClassID mcID = IDFactory.newModuleClassID();
		mcadv.setModuleClassID(mcID);

		/**
		 * remote publishing is of course needed so that the others know, we are
		 * there and offer a socket local publishing is needed, otherwise the
		 * local runtime is not informed when someone wants to reach us on the
		 * socket
		 */
		try {
			discovery.publish(mcadv);
		} catch (Exception e) {
			e.printStackTrace();
		}
		discovery.remotePublish(mcadv);

		ModuleSpecAdvertisement msAdv = (ModuleSpecAdvertisement) AdvertisementFactory
				.newAdvertisement(ModuleSpecAdvertisement
						.getAdvertisementType());
		msAdv.setName("JXTASPEC:" + dataSocketName);
		msAdv.setVersion("Version 0.9");
		msAdv.setCreator("Christian Sell & Martin Knechtel");
		msAdv.setModuleSpecID(IDFactory.newModuleSpecID(mcID));
		msAdv.setSpecURI("http://www.knechtel.eu/filesharing/datasocket");

		PipeAdvertisement pipeAdv = null;
		try {
			InputStream is = getClass().getResourceAsStream("./datasocket.adv");
			pipeAdv = (PipeAdvertisement) AdvertisementFactory
					.newAdvertisement(MimeMediaType.XMLUTF8, is);
			is.close();
		} catch (IOException ioe) {
			System.err.println("Faild to read/parse pipe advertisement");
			return;
		}
		/**
		 * each time new pipe-id, otherwise the peer sends to itself if he holds
		 * the same pipe id listening
		 */
		pipeAdv.setPipeID(IDFactory.newPipeID(netPeerGroup.getPeerGroupID()));
		msAdv.setPipeAdvertisement(pipeAdv);

		if (6 < DEBUGLEVEL) {
			// display the advertisement as a plain text document.
			StructuredTextDocument doc = (StructuredTextDocument) msAdv
					.getDocument(MimeMediaType.XMLUTF8);
			try {
				StringWriter out = new StringWriter();
				doc.sendToWriter(out);
				System.out.println(out.toString());
				out.close();
			} catch (IOException ioe) {
			}
		}

		try {
			discovery.publish(msAdv);
		} catch (Exception e) {
			e.printStackTrace();
		}
		discovery.remotePublish(msAdv);

		try {
			dataInSocket = new JxtaServerSocket(netPeerGroup, pipeAdv);
		} catch (IOException e1) {
			e1.printStackTrace();
		}

		/**
		 * discover listening sockets of our neighbours, choose one of them
		 */
		System.out
				.print("searching for a listening socket of a peer for binary data");
		Enumeration en = null;
		boolean pipefound = false;
		while (!pipefound) {
			try {
				// search remotely and fill our cache
				discovery.getRemoteAdvertisements(null, DiscoveryService.ADV,
						"Name", "JXTASPEC:" + dataSocketName, 10, null);

				// let's look in our local cache to see
				// if we have it!
				en = discovery.getLocalAdvertisements(DiscoveryService.ADV,
						"Name", "JXTASPEC:" + dataSocketName);

				while (en != null && en.hasMoreElements()) {
					ModuleSpecAdvertisement mdsadv = (ModuleSpecAdvertisement) en
							.nextElement();
					try {
						// we can find the pipe to connect to the service
						// in the advertisement.
						dataOutPipeAdv = mdsadv.getPipeAdvertisement();

						// is the found pipe another than we ourselves
						// advertised?
						String ourInputPipeId = pipeAdv.getPipeID().toString();
						String discoveredInputPipeID = dataOutPipeAdv
								.getPipeID().toString();
						if (!ourInputPipeId.equals(discoveredInputPipeID)) {
							pipefound = true;
							break; // we just take the first foreign pipe
							// later: let user choose to connect to which peer
						}
					} catch (Exception ex) {
						ex.printStackTrace();
						System.out
								.println("Client: Error discovering remote data socket.");
					}
				}

				// The discovery is asynchronous as we do not know
				// how long is going to take
				try { // sleep as much as we want. Yes we
					// should implement asynchronous listener pipe...
					Thread.sleep(2000);
				} catch (Exception e) {
				}
			} catch (IOException e) {
				// found nothing! move on
			}
			System.out.print(".");
		}
		System.out.println();
	}

	public void receiveFile(String fileName) {

		System.out
				.println("listening for incoming binary stream to write it to file "
						+ fileName);

		File newFile = new File(fileName);
		FileOutputStream output = null;
		try {
			output = new FileOutputStream(newFile);
		} catch (FileNotFoundException e1) {
			System.err.println("could not open file to write: " + fileName);
			return;
		}

		try {
			Socket socket = dataInSocket.accept();
			InputStream in = socket.getInputStream();

			byte[] outBuffer = new byte[socket.getReceiveBufferSize()];
			int bytesReceived = 0;
			while ((bytesReceived = in.read(outBuffer)) > 0) {
				output.write(outBuffer, 0, bytesReceived);
			}
			socket.close();
			output.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	public void sendFile(String fileName) {
		System.out.println("Sending file " + fileName);
		System.out.println("Establishing data connection");

		JxtaSocket socket = null;
		try {
			socket = new JxtaSocket(netPeerGroup, dataOutPipeAdv,
			// timeout 10 seconds
					10000);
		} catch (IOException e) {
			System.err.println("timed out.");
		}

		File newFile = new File(fileName);
		FileInputStream input;
		try {
			input = new FileInputStream(newFile);
		} catch (FileNotFoundException e) {
			System.err.println("file not found: " + fileName);
			return;
		}

		try {
			OutputStream out = socket.getOutputStream();

			byte[] nextBytes = new byte[socket.getSendBufferSize()];
			int bytesRead = 0;
			while ((bytesRead = input.read(nextBytes)) > 0) {
				out.write(nextBytes, 0, bytesRead);
				System.out.println(".");
			}
			System.out.println("finished");

			out.flush();
			input.close();
			socket.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	/**
	 * Starts jxta
	 */
	public void startJxta() {
		try {
			// create, and Start the default jxta NetPeerGroup
			netPeerGroup = PeerGroupFactory.newNetPeerGroup();
		} catch (PeerGroupException e) {
			// could not instantiate the group, print the stack and exit
			System.out.println("fatal error : group creation failure");
			e.printStackTrace();
			System.exit(1);
		}
		if (6 < DEBUGLEVEL)
			System.out.println("Getting DiscoveryService");
		discovery = netPeerGroup.getDiscoveryService();
		if (6 < DEBUGLEVEL)
			System.out.println("Getting PipeService");
		pipes = netPeerGroup.getPipeService();

		/**
		 * TODO: clean local cache from Advertisements of last runtimes
		 */
		try {
			flushLocalCache();
		} catch (IOException ioe) {
			System.err
					.println("flushing local cache of advertisements was not successful. "
							+ "Please do it manually (remove directory /.jxta/cm)");
		}

		prepareDataSocket();
		prepareMsgPipe();
	}

	/**
	 * on startup the local cache is cluttered with old entries from the last
	 * run which are not up to date any longer because the other peers may have
	 * left the network or shut down. therefore this method offers the cleanout
	 * of the local cache.
	 * 
	 * @throws IOException
	 */
	public void flushLocalCache() throws IOException {
		Enumeration myLocalEnum = null;
		myLocalEnum = discovery.getLocalAdvertisements(DiscoveryService.ADV,
				"Name", "JXTASPEC:" + msgPipeName);
		if ((myLocalEnum != null) && myLocalEnum.hasMoreElements()) {
			LOG.log(Level.INFO,
					"Flush local cache : flushing advertisements... ");
			Advertisement adv = null;
			while (myLocalEnum.hasMoreElements()) {
				adv = (Advertisement) myLocalEnum.nextElement();
				discovery.flushAdvertisement(adv);
			}
		}

		myLocalEnum = discovery.getLocalAdvertisements(DiscoveryService.ADV,
				"Name", "JXTASPEC:" + dataSocketName);
		if ((myLocalEnum != null) && myLocalEnum.hasMoreElements()) {
			LOG.log(Level.INFO,
					"Flush local cache : flushing advertisements... ");
			Advertisement adv = null;
			while (myLocalEnum.hasMoreElements()) {
				adv = (Advertisement) myLocalEnum.nextElement();
				discovery.flushAdvertisement(adv);
			}
		}

	}

}